package com.beau.filter;

import org.apache.dubbo.common.constants.CommonConstants;
import org.apache.dubbo.common.extension.Activate;
import org.apache.dubbo.rpc.*;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.Comparator;
import java.util.Map;
import java.util.concurrent.*;

@Activate(group = CommonConstants.CONSUMER)
public class TPMonitorFilter implements Filter {

    private final Logger Log = LoggerFactory.getLogger(TPMonitorFilter.class);
    private Map<String, CopyOnWriteArrayList<TPRecord>> tpRecordMap = new ConcurrentHashMap<>();

    public TPMonitorFilter() {
        ScheduledExecutorService executor = Executors.newScheduledThreadPool(2);
        // 清理过期的数据
        executor.scheduleAtFixedRate(new ClearRecord(), 0, 1, TimeUnit.MINUTES);
        // 上报数据
        executor.scheduleAtFixedRate(new ReportRecord(), 0, 5, TimeUnit.SECONDS);
        Log.info("TPMonitorFilter started");
    }

    /**
     * 记录响应时间
     *
     * @param invoker
     * @param invocation
     * @return
     * @throws RpcException
     */
    @Override
    public Result invoke(Invoker<?> invoker, Invocation invocation) throws RpcException {
        long startTime = System.currentTimeMillis();
        String methodName = invocation.getMethodName();
        try {
            return invoker.invoke(invocation);
        } finally {
            int responseTime = (int) (System.currentTimeMillis() - startTime);
            CopyOnWriteArrayList<TPRecord> tpRecords = tpRecordMap.get(methodName);
            if (tpRecords == null) {
                tpRecords = new CopyOnWriteArrayList<>();
            }
            tpRecords.add(new TPRecord(System.currentTimeMillis(), responseTime));
            tpRecordMap.put(methodName, tpRecords);
        }
    }

    /**
     * 每次请求的包装对象，记录请求时间，和请求的响应时间
     */
    private static class TPRecord {
        long timestamp;
        int responseTime;

        public TPRecord(long timestamp, int responseTime) {
            this.timestamp = timestamp;
            this.responseTime = responseTime;
        }

        @Override
        public String toString() {
            return responseTime + "";
        }
    }

    /**
     * 记录清理线程
     */
    private class ClearRecord implements Runnable {
        @Override
        public void run() {
            long current = System.currentTimeMillis();
            long oneMinutes = 60 * 1000L;
            tpRecordMap.forEach((k, v) -> {
                v.removeIf(tpRecord -> current - tpRecord.timestamp > oneMinutes);
            });
            Log.info("remove expired record");
        }
    }

    /**
     * 记录上报线程
     */
    private class ReportRecord implements Runnable {
        @Override
        public void run() {
            tpRecordMap.forEach((k, v) -> {
                // 按照响应时间排序
                v.sort(Comparator.comparingInt(r -> r.responseTime));
                Log.info("tpRecord of {} size is {}, record is {}", k, v.size(), v.toString());
                // 上报相应百分位的数据
                Log.warn("tp90 of {} is {}ms", k, v.get((int)Math.floor(v.size() * 0.9)).responseTime);
                Log.warn("tp99 of {} is {}ms", k, v.get((int)Math.floor(v.size() * 0.99)).responseTime);
            });
        }
    }
}
